package com.uxsino.reactorq.commons;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.BlobMessage;
import org.apache.commons.lang3.StringUtils;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;

import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;

/**
 * JMS Message as {@link Flux}
 * it convert the acutal message type from jms message, and call subscribers with it.
 * 
 * @param <T> the actual message type. if T is a {@link javax.jms.Message}, the raw jms message will be passed
 * POJO object is transfered as json string. 
 * String object transfered as is.
 * if T is a {@link IWithAttachmentStream}, an attachement will be passed with it
 * if T implements {@link IMessageWithReceiver}, the receiverId property will be set accordingly
 * 
 * see {@link TopicFlux}, {@link QueueFlux}
 */
public abstract class JMSFlux<T> extends Flux<T> {
    private static Logger logger = LoggerFactory.getLogger(JMSFlux.class);

    protected Connection connection = null;

    protected Session session = null;

    protected Destination destination = null;

    protected String destinationName;

    protected MessageConsumer msgConsumer;

    protected Class<? extends T> msgClass;

    private final AtomicBoolean subscribed = new AtomicBoolean(false);

    private org.reactivestreams.Subscriber<? super T> subscriber;

    private BiConsumer<javax.jms.Message, T> postDecodeAction = null;

    private ObjectMapper mapper = new ObjectMapper();

    protected String receiverId;

    protected boolean acknowledge = false;

    protected JMSFlux(Connection connection, String destName, Class<? extends T> msgClass, String receiverId)
        throws JMSException {
        this.connection = connection;
        this.receiverId = receiverId;
        connection.start();
        this.msgClass = msgClass;
        this.destinationName = destName;
        openSession();
        if (null != msgConsumer) {
            try {
                msgConsumer.setMessageListener(this::onMessage);
            } catch (JMSException e) {
                logger.error("error listening to queue. {}", e);
            }
        }

    }

    protected JMSFlux(Connection connection, String destName, Class<? extends T> msgClass, String receiverId,
        boolean acknowledge) throws JMSException {
        this.connection = connection;
        this.receiverId = receiverId;
        connection.start();
        this.msgClass = msgClass;
        this.destinationName = destName;
        this.acknowledge = acknowledge;
        openSession();
        if (null != msgConsumer) {
            try {
                msgConsumer.setMessageListener(this::onMessage);
            } catch (JMSException e) {
                logger.error("error listening to queue. {}", e);
            }
        }

    }

    public static final class JMSFluxSubscription implements Subscription {

        private Consumer<JMSFluxSubscription> cancelAction;

        public JMSFluxSubscription(Consumer<JMSFluxSubscription> cancelAction) {
            this.cancelAction = cancelAction;
        }

        @Override
        public void request(long n) {
            // how to deal with request?
        }

        @Override
        public void cancel() {
            if (cancelAction != null)
                cancelAction.accept(this);

        }

    }

    /**
     * get an object from json encoded message
     * 
     * @param cls
     * @return
     */
    @SuppressWarnings("unchecked")
    public <ObjectType> Flux<ObjectType> getJsonDecodedFlux(Class<?> cls) {
        return filter(msg -> msg instanceof javax.jms.TextMessage).map(msg -> {
            String s = null;
            try {
                s = ((javax.jms.TextMessage) msg).getText();
                return (ObjectType) mapper.readValue(s, cls);
            } catch (JMSException e) {
                logger.error("error converting message: {}", e);
            } catch (IOException e) {
                logger.error("error parsing json message: {}\n{}", s, e);
            }
            return null;
        });

    }

    /**
     * get the message text from various jms message type
     * @param msg
     * @return
     * @throws JMSException
     */
    private String getMessageText(javax.jms.Message msg) throws JMSException {
        if (msg instanceof TextMessage) {
            return ((TextMessage) msg).getText();
        }
        return msg.getStringProperty("text");
    }

    /**
     * call back for jms messge received
     * @param msg
     */
    @SuppressWarnings("unchecked")
    private void onMessage(javax.jms.Message msg) {
        try {
            String s = getMessageText(msg);
            if (acknowledge) {
                // 确认接收，并成功处理了消息，对Session.CLIENT_ACKNOWLEDGE有效
                msg.acknowledge();
                logger.debug("确认接收到Queue消息：{}", s);
            }
            if (subscriber != null) {
                if (javax.jms.Message.class.isAssignableFrom(msgClass)) {
                    // if the raw message is T
                    processDecoded(msg, (T) msg);
                } else if (msgClass == String.class) {
                    // if String is T
                    processDecoded(msg, (T) s);
                } else if (msgClass == JSONObject.class) {
                    // alibaba JOSONObject
                    processDecoded(msg, (T) JSONObject.parseObject(s));
                } else {
                    // default fastxml json object
                    processDecoded(msg, mapper.readValue(s, msgClass));
                }
            }
        } catch (IOException e) {
            logger.error("error parsing json message. {}", e);
        } catch (JMSException e) {
            logger.error("error reading jms message. {}", e);
        }

    }

    /**
     * proccess the decoded message, in actual type
     * @param msg
     * @param t
     */
    private void processDecoded(javax.jms.Message msg, T t) {

        if (t instanceof IWithAttachmentStream && msg instanceof BlobMessage) {
            try {
                ((IWithAttachmentStream) t).setAttachement(((BlobMessage) msg).getInputStream());
            } catch (IOException | JMSException e) {
                logger.error("error getting attachment.{}");
            }
        }
        if (postDecodeAction != null) {
            postDecodeAction.accept(msg, t);
        }
        try {
            subscriber.onNext(t);
        } catch (Exception e) {
            logger.error("error sending {}\n{}", msgClass, t);
            throw e;
        }
    }

    /**
     * a callback for scenario that the raw jms message is needed to tweak the actual message
     * @param action
     */
    public void onPostDecode(BiConsumer<javax.jms.Message, T> action) {
        postDecodeAction = action;
    }

    protected abstract Session openSession(Connection conn) throws JMSException;

    protected abstract Destination openDestination(Session session) throws JMSException;

    protected abstract MessageConsumer openMessageConsumer(Session session, String selector) throws JMSException;

    /**
     * open the jms session
     */
    private void openSession() {
        String selector = null;
        if (!StringUtils.isEmpty(this.receiverId)) {
            selector = "receiverId IS NULL OR receiverId = '" + this.receiverId + "'";
        }
        try {
            session = openSession(connection);
            destination = openDestination(session);
            msgConsumer = openMessageConsumer(session, selector);

            // session.commit();
        } catch (JMSException e) {
            logger.error("error creating queue. {}", e);
        }
    }

    /**
     * subscribe the consumer. see {@link Flux.subscribe}
     */
    @Override
    public void subscribe(CoreSubscriber<? super T> s) {
        if (!subscribed.compareAndSet(false, true)) {
            throw new IllegalStateException("Only single subscriber is supported");
        }
        this.subscriber = s;
        JMSFluxSubscription x = new JMSFluxSubscription(this::onSubscriptionCancel);
        s.onSubscribe(x);
    }

    private void onSubscriptionCancel(JMSFluxSubscription subscription) {
        this.subscribed.set(false);
        this.subscriber = null;
    }

    /**
     * dispose the resources
     */
    public void dispose() {
        try {
            if (session != null) {
                session.close();
            }
        } catch (JMSException e) {
        } finally {
            session = null;
        }
    }

    /**
     * return a wrapper consumer that catches all exceptions
     * @param consumer
     * @return
     */

    public static <T> CatchConsumer<T> Catch(Consumer<? super T> consumer) {
        return Catch(consumer, JMSFlux.logger);
    }

    /**
     * return a wrapper consumer that catches all exceptions
     * 
     * @param consumer
     * @param logger a logger to report error
     * @return
     */
    public static <T> CatchConsumer<T> Catch(Consumer<? super T> consumer, Logger logger) {
        return new CatchConsumer<>(consumer, logger);
    }

}
